/* 
 * Persistence4J - Simple library for data persistence using java
 * Copyright (c) 2010, Avdhesh yadav.
 * http://www.avdheshyadav.com
 * Contact: avdhesh.yadav@gmail.com
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 *
 */

package com.avdheshyadav.p4j.jdbc.dao;

import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.log4j.Logger;

import com.avdheshyadav.spiderdb.dbmodel.Column;
import com.avdheshyadav.spiderdb.dbmodel.PrimaryKey;
import com.avdheshyadav.p4j.common.DAOException;
import com.avdheshyadav.p4j.jdbc.common.CRUD;
import com.avdheshyadav.p4j.jdbc.dao.util.ConnUtils;
import com.avdheshyadav.p4j.jdbc.dbms.connfactory.DBConnector;
import com.avdheshyadav.p4j.jdbc.dbms.metadata.IMetaDataLoader;
import com.avdheshyadav.p4j.jdbc.dbms.metadata.TableStructure;
import com.avdheshyadav.p4j.jdbc.model.DTO;
import com.avdheshyadav.p4j.jdbc.service.GenericDAO;
import com.avdheshyadav.p4j.jdbc.service.GenericDTO;

/**
 * @author Avdhesh Yadav
 */
public abstract class AbsPersistenceManager
{
	/** */
	static Logger logger = Logger.getLogger(AbsPersistenceManager.class.getName());
	/** */
	public static final String SELECT_KEY="select * from ";
	/** */
	public static final String COUNT_KEY="select count(*) from ";
	/** */
	public static final String DELETE_KEY="delete from ";
	/** */
	public static final String AND = " and ";
	/** */
	protected static final String WHERE = " where ";
	/** */	
	protected int mFetchSize = 10;
	/** */
	public static final String ORDERBY = " order by ";
	/** */
	public static final String ASC = " asc ";
	/** */
	public static final String DESC = " desc";
	/** */
	protected final IMetaDataLoader m_MetaDataLoader;
	/** */
	protected final DBConnector m_Connector;
	/** */
	protected final CRUD m_CRUD;
	/** */
	private RowProcessor rowProcessor = new GenericDTORowProcessor();

	/**
	 * 
	 * @param loader IMetaDataLoader
	 * @param crud CRUD
	 */
	public AbsPersistenceManager(IMetaDataLoader loader, CRUD crud)
	{
		m_MetaDataLoader = loader;
		m_CRUD = crud;
		m_Connector = loader.getDBConnector();
	}


	/**
	 * 
	 * @return IMetaDataLoader
	 */
	public IMetaDataLoader getMetaDataLoader()
	{
		return m_MetaDataLoader;
	}


	/**
	 * 
	 * @return DBConnector
	 */
	public DBConnector getDBConnector()
	{
		return m_Connector;
	}


	/**
	 * 
	 * @param DTO dto
	 * 
	 * @return boolean
	 * 
	 * @throws DAOException
	 */
	public boolean isEntityExists(DTO dto) throws DAOException
	{
		GenericDTO gvo = (GenericDTO)dto;
		Connection conn = null;
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try 
		{
			StringBuilder queryBuilder = new StringBuilder(COUNT_KEY);
			Map<String, Object> data = gvo.getData();
			TableStructure tableStructure = getTableStructure(dto);
			PrimaryKey pk = tableStructure.getPrimaryKey();
			if(pk != null)
			{
				queryBuilder.append(extractPrimaryKeyColumnValues(pk, data, tableStructure).toString());
			}
			else
			{
				queryBuilder.append(extractAllColumnValues(data, tableStructure).toString());
			}
			conn = m_Connector.getConnection();
			pstmt = conn.prepareStatement(queryBuilder.toString());
			resultSet = m_CRUD.read(pstmt);
			if (resultSet.next())
			{
				if(resultSet.getInt(1) > 0) 
					return true;
			}
		} 
		catch (SQLException e) 
		{
			e.printStackTrace();
			throw new DAOException(e.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, pstmt, resultSet);
		}
		return false;
	}


	/**
	 * 
	 * @param tableName String
	 * @param params String
	 * @param value String
	 * 
	 * @return int
	 * 
	 * @throws DAOException
	 */
	public int countRows(String tableName, String params, String value)throws DAOException
	{
		String query = "select count(*) from " + tableName;
		if (params != null && value != null && !(params.equals("")))
		{
			query = query + WHERE + params + QueryBuilder.EQUAL + "'" + value + "'";
		}
		int rows = 0;
		Connection connection =  m_Connector.getConnection();
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try
		{
			pstmt = connection.prepareStatement(query);
			resultSet = m_CRUD.read(pstmt);
			if (resultSet.next())
			{
				return resultSet.getInt(1);
			}
		}
		catch (SQLException exception)
		{
			throw new DAOException(exception.getMessage(), exception);
		}
		finally
		{
			ConnUtils.closeQuietly(connection, pstmt, resultSet);
		}
		return rows;
	}


	/**
	 * 
	 * @param dto DTO
	 * 
	 * @throws Exception
	 */
	public void persistEntity(DTO dto)throws DAOException
	{
		Connection connection = null;
		PreparedStatement pstmt = null;
		try
		{	
			TableStructure tableStructure = getTableStructure(dto);
			String query = tableStructure.getInsertQuery();
			connection = m_Connector.getConnection();
			pstmt = connection.prepareStatement(query);
			rowProcessor.copyEntityObjectToPstmt(tableStructure, dto, pstmt);
			m_CRUD.create(pstmt);
		}
		catch (Exception exp)
		{
			logger.error("error persisting entity :" + dto.toString());
			exp.printStackTrace();
			throw new DAOException("Error persisting Entity::::" + exp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(connection, pstmt, null);
		}
	}


	/**
	 * 
	 * @param valueObject DTO
	 * @param whereClause Map<String,Object> 
	 * 
	 * @return DTO
	 * 
	 * @throws DAOException
	 */
	public DTO updateEntity(DTO dto,Map<String,Object> whereClause) throws DAOException
	{
		Connection conn = null;
		PreparedStatement pstmt = null;
		try 
		{
			TableStructure tableStructure = getTableStructure(dto);
			StringBuffer query = new StringBuffer (tableStructure.getUpdateQuery());

			if(whereClause != null)
			{
				StringBuilder st= new StringBuilder("");
				Iterator<Entry<String, Object>> itr = whereClause.entrySet().iterator();
				for(int i = 0 ; i < whereClause.size();i++)
				{
					Entry<String,Object> entry = itr.next();
					if (i > 0)
					{
						st.append(AND);
					} 
					st.append(entry.getKey());
					st.append("='");
					st.append(entry.getValue());
					st.append("'");
				}
				query.append(st.toString());
			}
			logger.debug("query.toString():"+query.toString());
			conn = m_Connector.getConnection();
			pstmt = conn.prepareStatement(query.toString());
			rowProcessor.copyEntityObjectToPstmt(tableStructure, dto, pstmt);
			boolean success = m_CRUD.update(pstmt);
			if(!success)
				throw new DAOException("Unable to update Entity.May be this entity does not exist in database");
		} 
		catch (Exception e)
		{
			logger.error("Error Updating entity :" +dto.getTableName());
			e.printStackTrace();
			throw new DAOException(e.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, pstmt, null);
		}
		return dto;
	}


	/**
	 * 
	 * @param dto DTO
	 * 
	 * @throws DAOException
	 */
	public void deleteEntity(DTO dto) throws DAOException
	{
		GenericDTO gvo = (GenericDTO)dto;
		Map<String, Object> data = gvo.getData();
		Connection conn = null;
		try
		{
			TableStructure tableStructure = getTableStructure(dto);
			PrimaryKey pk = tableStructure.getPrimaryKey();
			StringBuilder queryBuilder = new StringBuilder(DELETE_KEY);
			if(pk != null)
			{
				extractPrimaryKeyColumnValues(pk, data, tableStructure);
				queryBuilder.append(extractPrimaryKeyColumnValues(pk, data, tableStructure).toString());
			}
			else
			{
				queryBuilder.append(extractAllColumnValues(data, tableStructure));
			}
			logger.debug("queryBuffer:"+queryBuilder);
			conn = m_Connector.getConnection();
			boolean success = m_CRUD.delete(conn, queryBuilder.toString());
			if(!success)
				throw new DAOException("Unable to Delete Entity.May be this entity does not exist in database");
		} 
		catch(Exception exp)
		{
			exp.printStackTrace();
			throw new DAOException("Exception occured During Delete:"+exp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, null, null);
		}
	}


	/**
	 * 
	 * @param tableString String
	 * @param value Object []
	 * 
	 * @return DTO
	 * 
	 * @throws DAOException
	 */	
	public DTO findByPrimaryKey(String tableString , Object [] value) throws DAOException
	{
		TableStructure tableStructure = getTableStructure(tableString);
		PrimaryKey primaryKey = tableStructure.getPrimaryKey();
		Validate.notNull(primaryKey, "Table does not have any Any Primary Key. Define Primary key for the table: " + tableString);

		StringBuilder queryBuilder = new StringBuilder();
		queryBuilder.append(SELECT_KEY);
		queryBuilder.append(tableStructure.getFullTableName());
		queryBuilder.append(WHERE);
		Set<Column> columns  = primaryKey.getColumns();
		StringBuilder pKeys = new StringBuilder();

		for(Column column :columns)
		{
			pKeys.append(column.getColumnName() + ",");
		}
		String keys = pKeys.toString();
		keys = keys.substring(0, keys.length() - 1);
		String sequesnce[] = StringUtils.split(keys, ",");
		if(sequesnce != null && sequesnce.length == value.length)
		{
			for(int i = 0; i < sequesnce.length ; i++)
			{
				queryBuilder.append(" " + sequesnce[i]+"='");
				queryBuilder.append(value[i] + "'");
				if(i != sequesnce.length -1 )
					queryBuilder.append(AND);
			}
		}
		logger.debug("queryBuilder.toString():"+queryBuilder.toString());

		Connection conn = null;
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try 
		{
			conn = m_Connector.getConnection();
			pstmt = conn.prepareStatement(queryBuilder.toString(),ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
			resultSet = m_CRUD.read(pstmt);
			List<DTO> retVals = (List<DTO>) rowProcessor.toBeanList(tableStructure, resultSet, GenericDAO.initialPosition, GenericDAO.ALL_ROWS);
			if(retVals != null && retVals.size() > 0)
			{
				for(DTO v : retVals)
				{
					return v;
				}
			}
		}
		catch(Exception esxp)
		{
			esxp.printStackTrace();
			throw new DAOException("Error occured during findByPrimaryKey" + esxp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, pstmt, resultSet);
		}
		return null;
	}


	/**
	 *  
	 * @param tableName String
	 * 
	 * @return  List<DTO>
	 */
	public List<DTO> loadALLEntities(String tableString) throws DAOException
	{
		return loadEntities(tableString, null, null, GenericDAO.initialPosition, GenericDAO.ALL_ROWS);
	}


	/**
	 * 
	 * @param tableName String
	 * @param orderByParameter String
	 * @param order String
	 * @param startPosition int
	 * @param rows int
	 * 
	 * @return List<DTO>
	 */
	public List<DTO> loadEntities(String tableString, String orderByParameter ,String order , int startPosition , int rows) throws DAOException
	{
		List<DTO> retVals = null;
		Connection conn = null;
		try
		{		
			TableStructure tableStructure = getTableStructure(tableString);
			StringBuffer query = new StringBuffer();
			query.append(SELECT_KEY);
			query.append(tableStructure.getFullTableName());
			//code for ordering the values
			if(orderByParameter != null && order !=null )
			{
				query.append(ORDERBY);
				query.append(orderByParameter);
				query. append(" ");
				query.append(order);
			}
			//end of ordering code
			conn = m_Connector.getConnection();
			retVals = (List<DTO>)readData(conn, query.toString(), tableStructure, startPosition, rows);
			logger.debug("returned value:"+retVals.size());
		} 
		catch(Exception exp)
		{
			exp.printStackTrace();
			throw new DAOException("Error occured during findAllEntities" + exp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, null, null);
		}
		return retVals;
	}


	/**
	 * 
	 * @param tableString String
	 * @param query String
	 * @param startPosition
	 * @param rows int
	 * 
	 * @return List<DTO>
	 * 
	 * @throws DAOException
	 */
	public List<DTO> findByQuery(String tableString, String query, int startPosition,int rows) throws DAOException
	{
		logger.debug("tableName:"+tableString + " query:"+query);
		List<DTO> retVals = null;
		Connection conn = null;
		try
		{
			TableStructure tableStructure = getTableStructure(tableString);
			conn = m_Connector.getConnection();
			retVals = (List<DTO>)readData(conn, query, tableStructure, startPosition, rows);
		}
		catch(Exception exp)
		{
			exp.printStackTrace();
			throw new DAOException("DAO Exception Occured when executing the query:"+query);
		}
		finally
		{
			ConnUtils.closeQuietly(conn, null, null);
		}
		return retVals;
	}


	/**
	 * 
	 * @param tableName String
	 * @param params String[] 
	 * @param values String[] 
	 * @param orderByField String
	 * @param ascDesc String
	 * @param startPosition int
	 * @param rows int
	 * 
	 * @return  List<DTO>
	 * 
	 * @throws DAOException
	 */
	public List<DTO> find(String tableString,String[] params, String[] values, String orderByField,String ascDesc, int startPosition, int rows) throws DAOException
	{
		List<DTO> retVals = null;
		QueryBuilder queryBuilder = new QueryBuilder();
		String query = queryBuilder.buildQuery(tableString, QueryBuilder.SELECT, params, values, orderByField, ascDesc);
		Connection connection = m_Connector.getConnection();
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try
		{
			pstmt = connection.prepareStatement(query, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
			resultSet = m_CRUD.read(pstmt);
			TableStructure tableStructure = getTableStructure(tableString);
			retVals = (List<DTO>) rowProcessor.toBeanList(tableStructure, resultSet, startPosition, rows);
		}
		catch (SQLException exception)
		{
			exception.printStackTrace();
			throw new DAOException(exception.getMessage(), exception);
		}
		finally
		{
			ConnUtils.closeQuietly(connection, pstmt, resultSet);
		}
		return retVals;
	}


	/**
	 * 
	 * @param tableString String
	 * @param param String
	 * @param value String
	 * @param orderByField String
	 * @param ascDesc String
	 * @param startPosition int
	 * @param rows int
	 * 
	 * @return List<DTO>
	 * 
	 * @throws DAOException
	 */
	public List<DTO> findByAttribute(String tableString, String param, String value, String orderByField, String ascDesc, int startPosition, int rows) throws DAOException
	{
		List<DTO> retVals = null;
		String[] params = {param};
		String[] values = {value};
		QueryBuilder queryBuilder = new QueryBuilder();
		String query = queryBuilder.buildQuery(tableString, QueryBuilder.SELECT, params, values, orderByField, ascDesc);
		Connection connection = m_Connector.getConnection();
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try
		{
			pstmt = connection.prepareStatement(query, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
			resultSet = m_CRUD.read(pstmt);
			TableStructure tableStructure = getTableStructure(tableString);
			retVals = (List<DTO>) rowProcessor.toBeanList(tableStructure, resultSet, startPosition, rows);
		}
		catch (SQLException exception)
		{
			exception.printStackTrace();
			throw new DAOException(exception.getMessage(), exception);
		}
		finally
		{
			ConnUtils.closeQuietly(connection, pstmt, resultSet);
		}
		return retVals;
	}


	/**
	 * 
	 * @param dtos List<DTO>
	 * 
	 * @return Map<Integer, DTO>
	 * 
	 * @throws DAOException
	 */
	public Map<Integer, DTO> batchInsert(List<DTO> dtos) throws DAOException
	{		
		Map<Integer, DTO> failToInsertMap = new HashMap<Integer, DTO>();
		int insertCounts[];
		Connection conn = null;
		PreparedStatement pstmt = null;
		DTO dto = dtos.get(0);
		try
		{
			TableStructure tableStructure = getTableStructure(dto);
			conn = m_Connector.getConnection();
			String query = tableStructure.getInsertQuery();
			pstmt = conn.prepareStatement(query);

			DatabaseMetaData databaseMetaData = conn.getMetaData();
			if(databaseMetaData.supportsBatchUpdates())
			{	
				for(int i = 0; i < dtos.size(); i++ )
				{
					dto = dtos.get(i);
					rowProcessor.copyEntityObjectToPstmt(tableStructure, dto, pstmt);
					pstmt.addBatch();
				}
				try 
				{
					insertCounts = m_CRUD.batchInsert(pstmt);
				} 
				catch (BatchUpdateException batchException) 
				{
					logger.error("Batch Exception occured During the insertion of the batch:"+batchException.getMessage());
					insertCounts = batchException.getUpdateCounts();
					for (int i= 0; i< insertCounts.length; i++) 
					{
						if(insertCounts[i] == PreparedStatement.EXECUTE_FAILED)
						{
							//failed to execute
							failToInsertMap.put(i, dtos.get(i));
						}
					}
				}
			}
			else
			{
				for(int i = 0; i < dtos.size(); i++ )
				{
					dto = dtos.get(i);
					rowProcessor.copyEntityObjectToPstmt(tableStructure, dto, pstmt);
					try 
					{
						m_CRUD.create(pstmt);
					} 
					catch (SQLException e) 
					{
						e.printStackTrace();
						failToInsertMap.put(i, dtos.get(i));
					}
				}
			}
		}
		catch (Exception exp)
		{
			logger.error("error persisting batch of entities :");
			exp.printStackTrace();
			throw new DAOException("Error persisting Entity::::" + exp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, pstmt, null);
		}
		return failToInsertMap;
	}


	/**
	 * This method is here to save the number of connections.And decrease the over head of creating and closing a great deal of
	 * Connections.
	 * 
	 * @param tableString String
	 * @param queries {@link List}
	 * 
	 * @return List<? extends Object> 
	 */
	public List<? extends DTO> executeQueries(String tableString, List<String> queries) throws DAOException
	{
		logger.debug("inside executeQueries:"+tableString + " Queries:"+ queries);
		List<DTO> queriesData = new ArrayList<DTO>();
		Connection conn = m_Connector.getConnection();
		try 
		{
			TableStructure tableStructure = getTableStructure(tableString);
			for(String query : queries)
			{
				List<DTO> retVals = (List<DTO>)readData(conn, query, tableStructure , GenericDAO.initialPosition , GenericDAO.ALL_ROWS);
				queriesData.addAll(retVals);
			}
		} 
		catch (Exception e) 
		{
			throw new DAOException(e.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, null, null);
		}
		return queriesData;
	}


	/**
	 * 
	 * @param query String
	 * 
	 * @return boolean
	 */
	public boolean executeQuery(String query) throws DAOException
	{
		logger.debug("inside query: " + query);
		Connection conn = m_Connector.getConnection();
		PreparedStatement pstmt = null;
		boolean success = false;
		try 
		{
			pstmt = conn.prepareStatement(query);
			success = m_CRUD.update(pstmt);
		} 
		catch (Exception e) 
		{
			throw new DAOException(e.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(conn, pstmt, null);
		}
		return success;
	}


	/**
	 * 
	 * @param query int
	 * @param tableName  String
	 * @param startPosition int
	 * @param rows int
	 * 
	 * @return int
	 * 
	 * @throws DAOException
	 */
	private List<DTO> readData(Connection conn, String query, TableStructure tableStructure, int startPosition, int rows)throws DAOException
	{
		List<DTO> retVals = null;
		PreparedStatement pstmt = null;
		ResultSet resultSet = null;
		try
		{		
			pstmt = conn.prepareStatement(query,ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
			resultSet = m_CRUD.read(pstmt);
			retVals = (List<DTO>) rowProcessor.toBeanList(tableStructure, resultSet, startPosition, rows);
		} 
		catch(Exception exp)
		{
			exp.printStackTrace();
			throw new DAOException(exp.getMessage());
		}
		finally
		{
			ConnUtils.closeQuietly(null, pstmt, resultSet);
		}
		return retVals;
	}


	/**
	 * 
	 * @param dto DTO
	 * 
	 * @return TableStructure
	 */
	private TableStructure getTableStructure(DTO dto)
	{
		return getTableStructure(dto.getTableName());
	}


	/**
	 * 
	 * @param tableString String
	 * 
	 * @return TableStructure
	 */
	private TableStructure getTableStructure(String tableString)
	{
		String schemaName = getSchemaName(tableString);
		String tableName = getTableName(tableString);
		TableStructure tableStructure = m_MetaDataLoader.getTableStructure(schemaName,tableName);
		Validate.notNull(tableStructure, "TableStructure Can not be Null for TableName:"+tableString);
		return tableStructure;
	}


	/**
	 * 
	 * @param tableString
	 * 
	 * @return String
	 */
	private String getSchemaName(String tableString)
	{
		tableString = tableString.toLowerCase();
		try 
		{
			String stringArray[] = StringUtils.split(tableString,".");
			return stringArray[0];
		} 
		catch (ArrayIndexOutOfBoundsException e)
		{
			throw new IllegalStateException("please provide table String of this format(schemaname.tablename).your tableString is:"+tableString);
		}
	}


	/**
	 * 
	 * @param tableString String
	 * 
	 * @return String
	 */
	private String getTableName(String tableString)
	{
		tableString = tableString.toLowerCase();
		try 
		{	
			String stringArray[] = StringUtils.split(tableString,".");
			return stringArray[1];
		} 
		catch (ArrayIndexOutOfBoundsException e) 
		{
			throw new IllegalStateException(" please provide table String of this format(schemaname.tablename).your tableString is:"+tableString);
		}
	}


	/**
	 *  
	 * @param dto DTO
	 * @param tableStructure TableStructure
	 * 
	 * @return StringBuilder
	 */
	private StringBuilder extractAllColumnValues(Map<String, Object> data , TableStructure tableStructure)
	{
		StringBuilder sb = new StringBuilder(tableStructure.getFullTableName() + WHERE);
		List<Column> columns = tableStructure.getColumns();
		for(int i = 0; i< columns.size(); i++)
		{
			String columnName = columns.get(i).getColumnName();
			columnName = columnName.toLowerCase();
			Object obj = data.get(columnName);
			sb.append(columnName);
			sb.append("='");
			sb.append(obj + "'");
			if(i != columns.size() -1 )
				sb.append(" and ");
		}
		return sb;
	}


	/**
	 * 
	 * @param pk PrimaryKey
	 * @param data Map<String, Object>
	 * @param tableStructure TableStructure
	 * @return
	 */
	private StringBuffer extractPrimaryKeyColumnValues(PrimaryKey pk, Map<String, Object> data, TableStructure tableStructure)
	{
		StringBuffer queryBuilder = new StringBuffer();
		queryBuilder.append(tableStructure.getFullTableName());
		queryBuilder.append(WHERE);
		Set<Column> columns = pk.getColumns();
		if(columns != null)
		{
			StringBuffer pKeys = new StringBuffer();
			for(Column column :columns)
			{
				pKeys.append(column.getColumnName() + ",");
			}
			String keys = pKeys.toString();
			keys = keys.substring(0, keys.length() - 1);
			String sequesnce[] = StringUtils.split(keys, ",");
			for(int i = 0; i< sequesnce.length ; i++)
			{
				sequesnce[i] = sequesnce[i].toLowerCase();
				queryBuilder.append(" " + sequesnce[i]+"='");
				queryBuilder.append(data.get(sequesnce[i]) + "'");
				if(i != sequesnce.length -1 )
					queryBuilder.append(AND);
			}
		}
		return queryBuilder;
	}


	/**
	 * 
	 * @param resultSet ResultSet
	 * @param startPosition int
	 * 
	 * @throws SQLException
	 */
	protected void setResultSetAttribute(ResultSet resultSet, int startPosition)throws SQLException
	{
		resultSet.setFetchSize(mFetchSize);
		resultSet.relative(startPosition - 1);
	}
}